-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16518: Implement KIP-853 flags for storage-tool.sh #16669
Conversation
d5ea780
to
b70fa60
Compare
e3f496f
to
6332513
Compare
c1a4fe9
to
dcd9ea9
Compare
As part of KIP-853, storage-tool.sh now has two new flags: --standalone, and --initial-voters. This PR implements these two flags in storage-tool.sh. There are currently two valid ways to format a cluster: - The pre-KIP-853 way, where you use a statically configured controller quorum. In this case, neither --standalone nor --initial-voters may be specified, and kraft.version must be set to 0. - The KIP-853 way, where one of --standalone and --initial-voters must be specified with the initial value of the dynamic controller quorum. In this case, kraft.version must be set to 1. This PR moves the formatting logic out of StorageTool.scala and into Formatter.java. The tool file was never intended to get so huge, or to implement complex logic like generating metadata records. Those things should be done by code in the metadata or raft gradle modules. This is also useful for junit tests, which often need to do formatting. (The 'info' and 'random-uuid' commands remain in StorageTool.scala, for now.)
dcd9ea9
to
ab2c8a5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes @cmccabe. Wanted to give a review withe more important feedback.
reconfigurableQuorumOptions.addArgument("--standalone", "-s"). | ||
help("Used to initialize a single-node quorum controller quorum."). | ||
action(storeTrue()) | ||
reconfigurableQuorumOptions.addArgument("--initial-voters", "-I"). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KIP suggest --controller-quorum-voters
for this flag. I suggested this name because it matches the existing configuration property controller.quorum.voters
. It is good to have controller is the name as that is a concept the user is familiar. Voter is an internal KRaft concept that we rarely expose to end users. If you want to keep the word "initial" maybe we can name this flag --controller-quorum-initial-voters
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KIP suggest --controller-quorum-voters for this flag. I suggested this name because it matches the existing configuration property controller.quorum.voters. It is good to have controller is the name as that is a concept the user is familiar
controller.quorum.voters
is a separate configuration (which I think you said you wanted to deprecate eventually) so I think naming the flag the same thing would be highly confusing!
maybe we can name this flag --controller-quorum-initial-voters. What do you think
I think that's too long. --initial-voters
seems short and descriptive, so to be honest I think we should stick with that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about --initial-controllers
? I am concerned that the user many not know what a voter is. For example, for adding and removing controllers, we have kafka-metadata-quorum add-controller|remove-controller
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. --initial-controllers
makes sense.
help("A list of controller quorum voter ids, directories, and hostname:port pairs. For example:\n" + | ||
"0-JEXY6aqzQY-32P5TStzaFg@localhost:8082,1-MvDxzVmcRsaTz33bUuRU6A@localhost:8083,2-07R5amHmR32VDA6jHkGbTA@localhost:8084\n"). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about mentioning in the description that the same value must be used in all of the controller nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I added this text:
The same values must be used to format all nodes.
String directoryIdString = input.substring(endColonIndex + 1); | ||
Uuid directoryId; | ||
try { | ||
directoryId = Uuid.fromString(directoryIdString); | ||
} catch (IllegalArgumentException e) { | ||
throw new IllegalArgumentException("Failed to parse directory ID in dynamic voter string.", e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to parse the following schema <id>@<host>:<port>:<directoryId>
. This doesn't match the description in the help string for --initial-voters
or the description documented in the KIP. Was this change intentional? If so, why?
To me <id>-<directoryId>@<host>:<port>
is clearer conceptually since <host>:<port>
is a known endpoint format used by URL. I also think that this schema reads a bit better: "voter 1 with directory id ## at localhost and port 1234."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it this way because having a UUID in the middle of the whole thing kills readability for me.
When I look at:
0@localhost:8082:JEXY6aqzQY-32P5TStzaFg
I can first see
0@localhost:8082
Which makes sense and is familiar. And then I see the stuff at the end.
When I look at:
0-JEXY6aqzQY-32P5TStzaFg@localhost:8082
It's not visually clear whether the UUID is 0-JEXY6aqzQY-32P5TStzaFg
or -JEXY6aqzQY-32P5TStzaFg
, or what.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. You convinced me. replica id, host and port are more human readable so maybe they should come first in the string. In summary, we are going to keep the current schema of <id>@<host>:<port>:<directoryId>
. Can you update the description for --initial-voters
to match the implemented schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
public static Endpoints fromMap(Map<ListenerName, InetSocketAddress> endpoints) { | ||
return new Endpoints(new HashMap<>(endpoints)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have this "constructor". Checkout fromInetSocketAddress
. Feel free to change it to copy the map instead of reusing the same map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I don't know how I missed that. I will use that then.
try (FileRawSnapshotWriter writer = FileRawSnapshotWriter.create( | ||
clusterMetadataDirectory.toPath(), new OffsetAndEpoch(0, 0))) { | ||
new RecordsSnapshotWriter.Builder(). | ||
setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()). | ||
setMaxBatchSize(KafkaRaftClient.MAX_BATCH_SIZE_BYTES). | ||
setRawSnapshotWriter(writer). | ||
setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)). | ||
setVoterSet(Optional.of(voterSet)). | ||
build(new MetadataRecordSerde()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a little subtle but I have a few observations. The created RecordsSnapshotWriter<?>
owns the passed FileRawSanpshotWriter
so it will close the raw snapshot write when the records snapshot writer is close. The writer to the snapshot must be finalized with freeze
before closing. We have a constant for the bootstrap snapshot id o.a.k.s.Snapshots.BOOTSTRAP_SNAPSHOT_ID
.
This means that his code should be changed to:
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder()
.setLastContainedLogTimestamp(Time.SYSTEM.milliseconds())
.setMaxBatchSize(KafkaRaftClient.MAX_BATCH_SIZE_BYTES)
.setRawSnapshotWriter(
FileRawSnapshotWriter.create(
clusterMetadataDirectory.toPath(),
Snapshots.BOOTSTRAP_SNAPSHOT_ID
)
)
.setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion))
.setVoterSet(Optional.of(voterSet));
try (RecordsSnapshotWriter writer = builder.build(new MetadataRecordSerde())) {
writer.freeze();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good find. fixed.
import org.apache.kafka.raft.DynamicVoters; | ||
import org.apache.kafka.raft.KafkaRaftClient; | ||
import org.apache.kafka.raft.OffsetAndEpoch; | ||
import org.apache.kafka.raft.internals.VoterSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. I created an issue to move this from raft.internals
to raft
: https://issues.apache.org/jira/browse/KAFKA-17238
File clusterMetadataDirectory = new File(parentDir, String.format("%s-%d", | ||
CLUSTER_METADATA_TOPIC_PARTITION.topic(), | ||
CLUSTER_METADATA_TOPIC_PARTITION.partition())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. Unfortunate. This is computed by UnifiedLog.logDirName
but that is part of the core
module.
tests look clean aside from flakes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed all of the changes.
writeDynamicQuorumSnapshot(writeLogDir, | ||
initialControllers.get(), | ||
featureLevels.get(KRaftVersion.FEATURE_NAME), | ||
controllerListenerName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. So the kraft.version = 0 if and only if there is are no DynamicVoter
/VoterSet
and the tool doesn't write a 0-0.checkpoint
file. The kraft.version = 1 if and only if there are DynamicVoter
/VoterSet
and the tool writes a 0-0.checkpoint
file.
Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's correct
} | ||
} | ||
|
||
// -> test that KIP-853 fails on older MV |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you planning to implement this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will implement
// Utils.delete(new File(logDir1)) | ||
// Utils.delete(new File(logDir2)) | ||
// } | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you planning to uncomment these tests? If not, let's remove them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove
//formatter.setFeatureLevel(Features.TRANSACTION_VERSION.featureName, | ||
// Features.TRANSACTION_VERSION.defaultValue(metadataVersion)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uncommented code? Let's remove this if it is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
public VoterSet.VoterNode toVoterNode(String controllerListenerName) { | ||
ReplicaKey voterKey = ReplicaKey.of(nodeId, directoryId); | ||
Endpoints listeners = Endpoints.fromInetSocketAddresses(Collections.singletonMap( | ||
new ListenerName(controllerListenerName), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is safer to use ListenerName.normalized
.
SupportedVersionRange supportedKRaftVersion = | ||
new SupportedVersionRange((short) 0, (short) 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. I am adding SupportedVersionRange supportedVersionRange()
to o.a.k.s.c.Features
in #16735
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can change it at that point then
@cmccabe please take a look at the test results. There seem to be related failures. |
@cmccabe it looks like the tool is not grabbing the hostname correctly: $ git diff
diff --git a/config/kraft/server.properties b/config/kraft/server.properties
index dfa9489cbc..f34b18c889 100644
--- a/config/kraft/server.properties
+++ b/config/kraft/server.properties
@@ -28,6 +28,7 @@ node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093
+unstable.feature.versions.enable=true
############################# Socket Server Settings ############################# bin/kafka-dump-log.sh --cluster-metadata-decoder --files /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000-0000000000.checkpoint
Dumping /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000-0000000000.checkpoint
Snapshot end offset: 0, epoch: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: true deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1722614412657 size: 160 magic: 2 compresscodec: none crc: 1500076082 isvalid: true
| offset: 0 CreateTime: 1722614412657 keySize: 4 valueSize: 11 sequence: -1 headerKeys: [] SnapshotHeader {"version":0,"lastContainedLogTimestamp":1722614412648}
| offset: 1 CreateTime: 1722614412657 keySize: 4 valueSize: 5 sequence: -1 headerKeys: [] KRaftVersion {"version":0,"kRaftVersion":1}
| offset: 2 CreateTime: 1722614412657 keySize: 4 valueSize: 50 sequence: -1 headerKeys: [] KRaftVoters {"version":0,"voters":[{"voterId":1,"voterDirectoryId":"Xf-d77HEShOwt6TTMEVApQ","endpoints":[{"name":"CONTROLLER","host":"null","port":9093}],"kRaftVersionFeature":{"minSupportedVersion":0,"maxSupportedVersion":1}}]}
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: true deleteHorizonMs: OptionalLong.empty position: 160 CreateTime: 1722614412661 size: 75 magic: 2 compresscodec: none crc: 1648935618 isvalid: true
| offset: 3 CreateTime: 1722614412661 keySize: 4 valueSize: 3 sequence: -1 headerKeys: [] SnapshotFooter {"version":0} Notice that the host field in the voter set control records is set to |
Fixed the test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if the test pass.
We are going to fix this issue in another PR: #16669 (comment)
As part of KIP-853, storage-tool.sh now has two new flags: --standalone, and --initial-voters. This PR implements these two flags in storage-tool.sh. There are currently two valid ways to format a cluster: The pre-KIP-853 way, where you use a statically configured controller quorum. In this case, neither --standalone nor --initial-voters may be specified, and kraft.version must be set to 0. The KIP-853 way, where one of --standalone and --initial-voters must be specified with the initial value of the dynamic controller quorum. In this case, kraft.version must be set to 1. This PR moves the formatting logic out of StorageTool.scala and into Formatter.java. The tool file was never intended to get so huge, or to implement complex logic like generating metadata records. Those things should be done by code in the metadata or raft gradle modules. This is also useful for junit tests, which often need to do formatting. (The 'info' and 'random-uuid' commands remain in StorageTool.scala, for now.) Reviewers: José Armando García Sancio <[email protected]>
As part of KIP-853, storage-tool.sh now has two new flags: --standalone, and --initial-voters. This PR implements these two flags in storage-tool.sh. There are currently two valid ways to format a cluster: The pre-KIP-853 way, where you use a statically configured controller quorum. In this case, neither --standalone nor --initial-voters may be specified, and kraft.version must be set to 0. The KIP-853 way, where one of --standalone and --initial-voters must be specified with the initial value of the dynamic controller quorum. In this case, kraft.version must be set to 1. This PR moves the formatting logic out of StorageTool.scala and into Formatter.java. The tool file was never intended to get so huge, or to implement complex logic like generating metadata records. Those things should be done by code in the metadata or raft gradle modules. This is also useful for junit tests, which often need to do formatting. (The 'info' and 'random-uuid' commands remain in StorageTool.scala, for now.) Reviewers: José Armando García Sancio <[email protected]>
Fixes a regression introduced by #16669 which inadvertently stopped processing SCRAM arguments from kafka-storage.sh Reviewers: Colin P. McCabe <[email protected]>, Federico Valeri <[email protected]>
Fixes a regression introduced by #16669 which inadvertently stopped processing SCRAM arguments from kafka-storage.sh Reviewers: Colin P. McCabe <[email protected]>, Federico Valeri <[email protected]>
Fixes a regression introduced by apache#16669 which inadvertently stopped processing SCRAM arguments from kafka-storage.sh Reviewers: Colin P. McCabe <[email protected]>, Federico Valeri <[email protected]>
Fixes a regression introduced by apache#16669 which inadvertently stopped processing SCRAM arguments from kafka-storage.sh Reviewers: Colin P. McCabe <[email protected]>, Federico Valeri <[email protected]>
As part of KIP-853, storage-tool.sh now has two new flags: --standalone, and --initial-voters. This PR implements these two flags in storage-tool.sh.
There are currently two valid ways to format a cluster:
The pre-KIP-853 way, where you use a statically configured controller quorum. In this case, neither --standalone nor --initial-voters may be specified, and kraft.version must be set to 0.
The KIP-853 way, where one of --standalone and --initial-voters must be specified with the initial value of the dynamic controller quorum. In this case, kraft.version must be set to 1.
This PR moves the formatting logic out of StorageTool.scala and into Formatter.java. The tool file was never intended to get so huge, or to implement complex logic like generating metadata records. Those things should be done by code in the metadata or raft gradle modules. This is also useful for junit tests, which often need to do formatting. (The 'info' and 'random-uuid' commands remain in StorageTool.scala, for now.)